package com.gl.sass.mq.configure;

import com.gl.sass.mq.annotation.SaaSAutoMq;
import com.gl.sass.mq.consumer.BaseConsumer;
import com.gl.sass.mq.listener.ConsumerListener;
import com.gl.sass.mq.properties.ConsumerProperties;

import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.annotation.Order;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

/**
 *  mq注解配置
 * @author xiehong
 *
 */
@Component
@Configuration
@Order
public class ConsumerAutoConfigure implements ApplicationContextAware {

    private static final Logger log = LoggerFactory.getLogger(ConsumerAutoConfigure.class);

    @Autowired
    private Environment env;

    private ApplicationContext applicationContext;

    /**
     * 	监听器处理类集合 key为队列名
     */
    private Map<String, ConsumerListener> consumerListenerMap = new HashMap<>();

    @PostConstruct
    public void scanConsumer() {
        initBaseConsumer();
    }

    public void initBaseConsumer() {
        Map<String, Object> baseConsumerMap = applicationContext.getBeansWithAnnotation(SaaSAutoMq.class);
        log.info("msg1=扫描到MQ消费者Bean总数,,size={}", baseConsumerMap.size());
        if (MapUtils.isNotEmpty(baseConsumerMap)) {
            baseConsumerMap.forEach(this::initBaseConsumerListener);
            log.info("msg1=当前共计初始化消费者总条数,,size={}", this.getConsumerListenerMap().size());
        }
    }

    public void initBaseConsumerListener(String k, Object v) {
        try {
            checkBaseConsumer(v);
            SaaSAutoMq glAutoMq = v.getClass().getAnnotation(SaaSAutoMq.class);
            BaseConsumer baseConsumer = (BaseConsumer)v;
            ConsumerListener consumerListener =
                new ConsumerListener(buildConsumerProperties(glAutoMq, baseConsumer), baseConsumer);
            this.getConsumerListenerMap()
                .put(consumerListener.getConsumerProperties().getGroupName(), consumerListener);
            consumerListener.start();
            log.info("msg1=启动MQ消费者,,properties={}" + consumerListener.getConsumerProperties().toString());
        } catch (Exception e) {
        	log.error("初始化消费者失败",e);
        }
    }

    public String getNamesrvAddr() throws Exception {
        String key = "mq.namesrvAddr";
        if (!this.env.containsProperty(key)) {
            throw new Exception("请配置RocketMQ服务器地址:" + key);
        }
        return getEnvProperties(key);
    }

    public void checkBaseConsumer(Object v) throws Exception {
        if (!(v instanceof BaseConsumer)) {
        	log.error( "RocketMQ队列监听失败, RocketMQ队列监听处理器必须继承实现抽象类: com.gl.carservice.mq.consumer.BaseConsumer 处理器类名: " +
                    v.getClass().getName());

            throw new Exception(
                "RocketMQ队列监听失败, RocketMQ队列监听处理器必须继承实现抽象类: com.gl.carservice.mq.consumer.BaseConsumer 处理器类名: " +
                v.getClass().getName());
        }
    }

    public ConsumerProperties buildConsumerProperties(SaaSAutoMq glAutoMq, BaseConsumer baseConsumer)
        throws Exception {
        ConsumerProperties consumerProperties = new ConsumerProperties();
        consumerProperties.setGroupName(getProperties(getGroupName(glAutoMq)));
        consumerProperties.setTopic(getProperties(glAutoMq.topic()));

        consumerProperties.setNamesrvAddr(getNamesrvAddr());
        consumerProperties.setConsumerMessageBatchMaxSize(glAutoMq.consumerMessageBatchMaxSize());
        consumerProperties.setPullBatchSize(glAutoMq.pullBatchSize());
        consumerProperties.setThreadMax(glAutoMq.threadMax());
        consumerProperties.setThreadMin(glAutoMq.threadMin());
        consumerProperties.setTag(buildTagSet(baseConsumer.getTagSet()));
        return consumerProperties;
    }

    public String buildTagSet(Set<String> tagSet) {
        if (tagSet.isEmpty()) {
            return "*";
        }
        return StringUtils.join(tagSet, " || ");
    }

    public String getGroupName(SaaSAutoMq glAutoMq) {
        String groupName = glAutoMq.groupName();
        return StringUtils.isEmpty(groupName) ?
            BaseConsumer.class.getSimpleName() + this.getConsumerListenerMap().size() :
            groupName;
    }

    public String getProperties(String key) {
        if (StringUtils.isNotBlank(key) && key.startsWith("${") && key.endsWith("}")) {
            return getEnvProperties(key.substring(2, key.length() - 1));
        }
        return key;
    }

    public String getEnvProperties(String key) {
        return this.env.getProperty(key).trim();
    }

    public Map<String, ConsumerListener> getConsumerListenerMap() {
        return consumerListenerMap;
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

}
